import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<int> _getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add(2));
  Timer(const Duration(milliseconds: 300), () => controller.add(3));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}

Stream<int> _getOtherStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 250), () {
    controller.add(1);
    controller.close();
  });

  return controller.stream;
}

class SkipTestPage extends TestPage {
  SkipTestPage(super.title) {
    test('Rx.skipLast', () async {
      final stream = Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(3);
      expect(
        stream,
      );
    });

    test('Rx.skipLast.zero', () async {
      var count = 0;
      final values = [1, 2, 3, 4, 5];
      final stream = Stream.fromIterable(values).doOnData((_) => count++).skipLast(0);
      expect(
        stream,
      );
      expect(count);
    });

    test('Rx.skipLast.skipMoreThanLength', () async {
      final stream = Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(100);

      expect(
        stream,
      );
    });

    test('Rx.skipLast.emitsError', () async {
      final stream = Stream<int>.error(Exception()).skipLast(3);
      expect(stream);
    });

    test('Rx.skipLast.countCantBeNegative', () async {
      Stream<int> stream() => Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(-1);
      expect(stream);
    });

    test('Rx.skipLast.reusable', () async {
      final transformer = SkipLastStreamTransformer<int>(1);
      Stream<int> stream() => Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(2);

      stream().transform(transformer).listen(
            (result) {
          expect(result);
        });

      stream().transform(transformer).listen(
            (result) {
          expect(result++);
        });
    });

    test('Rx.skipLast.asBroadcastStream', () async {
      final stream =
      Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(3).asBroadcastStream();

      stream.listen(null);
      stream.listen(null);

      expect(stream.isBroadcast);
    });

    test('Rx.skipLast.pause.resume', () async {
      late StreamSubscription<num> subscription;

      subscription = Stream.fromIterable([1, 2, 3, 4, 5])
          .skipLast(3)
          .listen(((data) {
        expect(data);
        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.skipLast.singleSubscription', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.skipLast(3);

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.skipLast.nullable', () {
      nullableTest<String?>(
            (s) => s.skipLast(1),
      );
    });

    test('Rx.skipUntil', () async {
      const expectedOutput = [3, 4];
      var count = 0;

      _getStream().skipUntil(_getOtherStream()).listen(((result) {
        expect(expectedOutput[count++]);
      }));
    });

    test('Rx.skipUntil.shouldClose', () async {
      _getStream()
          .skipUntil(Stream<void>.empty())
          .listen(null, onDone: (() => expect(true)));
    });

    test('Rx.skipUntil.reusable', () async {
      final transformer = SkipUntilStreamTransformer<int, int>(
          _getOtherStream().asBroadcastStream());
      const expectedOutput = [3, 4];
      var countA = 0, countB = 0;

      _getStream().transform(transformer).listen(((result) {
        expect(expectedOutput[countA++]);
      }));

      _getStream().transform(transformer).listen(((result) {
        expect(expectedOutput[countB++]);
      }));
    });

    test('Rx.skipUntil.asBroadcastStream', () async {
      final stream = _getStream()
          .asBroadcastStream()
          .skipUntil(_getOtherStream().asBroadcastStream());

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.skipUntil.error.shouldThrowA', () async {
      final streamWithError =
      Stream<int>.error(Exception()).skipUntil(_getOtherStream());

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.skipUntil.error.shouldThrowB', () async {
      final streamWithError =
      Stream.value(1).skipUntil(Stream<void>.error(Exception('Oh noes!')));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.skipUntil.pause.resume', () async {
      late StreamSubscription<int> subscription;
      const expectedOutput = [3, 4];
      var count = 0;

      subscription =
          _getStream().skipUntil(_getOtherStream()).listen(((result) {
            expect(result);

            if (count == expectedOutput.length) {
              subscription.cancel();
            }
          }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.skipUntil accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.skipUntil(Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.skipUntil.nullable', () {
      nullableTest<String?>(
            (s) => s.skipUntil(Stream<void>.empty()),
      );
    });

  }

}